# -*- coding:utf-8 -*-
__author__ = 'gin.chen'

from kafka import KafkaProducer
import json


class KafkaSink(object):

    def __init__(self, host, port, topic):
        self.host = host
        self.port = str(port)
        self.topic = topic
        self.producer = KafkaProducer(key_serializer=lambda k: json.dumps(k).encode('utf-8'),
                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                      bootstrap_servers=self.host + ":" + self.port)

    def send_message(self, data):
        producer = self.producer
        future = producer.send(topic=self.topic, key=data["user_id"], value=data)
        future.get(timeout=10)

    def close(self):
        producer = self.producer
        producer.close()
